Power the TaskPool with a sync channel
authorAlex Crichton <alex@alexcrichton.com>
Tue, 29 Jul 2014 04:23:48 +0000 (21:23 -0700)
committerAlex Crichton <alex@alexcrichton.com>
Sat, 2 Aug 2014 07:16:19 +0000 (00:16 -0700)
This rate limits the jobs entering the pool to ensure that the number of
pending jobs is actually close to the number of cores. The current behavior is
to print all pending jobs, even when they're not executing, which may be
confusing.

src/cargo/util/pool.rs

index 2b3697a414cd578c65b36b25ff4347fbd0322962..1dc2d73451fbe4a355b386afeaf388c3e223fd86 100644 (file)
@@ -8,54 +8,35 @@
 use std::sync::{Arc, Mutex};
 
 pub struct TaskPool {
-    state: Arc<Mutex<State>>,
+    tx: SyncSender<proc():Send>,
 }
 
-struct State { done: bool, jobs: Vec<proc():Send> }
-
 impl TaskPool {
     pub fn new(tasks: uint) -> TaskPool {
         assert!(tasks > 0);
+        let (tx, rx) = sync_channel(tasks);
 
-        let state = Arc::new(Mutex::new(State {
-            done: false,
-            jobs: Vec::new(),
-        }));
+        let state = Arc::new(Mutex::new(rx));
 
         for _ in range(0, tasks) {
-            let myjobs = state.clone();
-            spawn(proc() worker(&*myjobs));
+            let state = state.clone();
+            spawn(proc() worker(&*state));
         }
 
-        return TaskPool { state: state };
-
-        fn worker(mystate: &Mutex<State>) {
-            let mut state = mystate.lock();
-            while !state.done {
-                match state.jobs.pop() {
-                    Some(job) => {
-                        drop(state);
-                        job();
-                        state = mystate.lock();
-                    }
-                    None => state.cond.wait(),
+        return TaskPool { tx: tx };
+
+        fn worker(rx: &Mutex<Receiver<proc():Send>>) {
+            loop {
+                let job = rx.lock().recv_opt();
+                match job {
+                    Ok(job) => job(),
+                    Err(..) => break,
                 }
             }
         }
     }
 
     pub fn execute(&self, job: proc():Send) {
-        let mut state = self.state.lock();
-        state.jobs.push(job);
-        state.cond.signal();
-    }
-}
-
-impl Drop for TaskPool {
-    fn drop(&mut self) {
-        let mut state = self.state.lock();
-        state.done = true;
-        state.cond.broadcast();
-        drop(state);
+        self.tx.send(job);
     }
 }